[HUDI-5857] Insert overwrite into bucket table would generate new file group id#8072
Conversation
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
Outdated
Show resolved
Hide resolved
|
Thanks @beyond1920 overall looks good. |
KnightChess
left a comment
There was a problem hiding this comment.
how about remove tagLocation when use insert overwrite op, like #8073 , I think this can also solve this quesion too. And the scenario in insert overwrite I think no need to tagLocation, right?
| protected Partitioner getPartitioner(WorkloadProfile profile) { | ||
| return table.getStorageLayout().layoutPartitionerClass() | ||
| .map(c -> getLayoutPartitioner(profile, c)) | ||
| .map(c -> c.equals(HoodieLayoutConfig.SIMPLE_BUCKET_LAYOUT_PARTITIONER_CLASS_NAME) |
There was a problem hiding this comment.
does consistentBucketIndex will not cause the same problem?
There was a problem hiding this comment.
No, consistentBucketIndex works correctly, it would generate different file ids.
There was a problem hiding this comment.
@KnightChess Thanks for your advice.
Remove tagLocation could also fixed this problem. However I prefer to fix this problem by generate new file ids because:
- Remove tag location would change stats, for example, miss updated count
- It's better to keep same behavior for all index types instead of only remove tag location in insert overwrite for bucket index table.
But remove tag location is a good improvement to speed up insert overwrite. I would created a new JIRA to track this issue. Maybe using bulk insert to do insert overwrite for all index typed. WDYT?
There was a problem hiding this comment.
@beyond1920 I read consistentBucketIndex implementation, found it must tag incomming records to allocation fgId, so #8073 will cause some quesion
There was a problem hiding this comment.
No, consistentBucketIndex works correctly, it would generate different file ids.
consistentBucketIndex can not work correctly, change the ut case
There was a problem hiding this comment.
emm, sorry for the hurry response before.
Thank you for point it out.
I need to spend more time to get familiar with ConsistentBucketIndex. I would response ASAP.
| } | ||
|
|
||
| test("Test Insert Overwrite") { | ||
| test("Test Insert Overwrite for bucket ") { |
There was a problem hiding this comment.
add test for consistentBucketIndex
There was a problem hiding this comment.
ConsistentBucketIndex works correctly, it would generate different file ids.
However, I add the test cases for consitentBucketIndex too.
9a4747e to
5fa9c60
Compare
| // Insert overwrite static partition | ||
| spark.sql( | ||
| s""" | ||
| | insert overwrite table $tableName partition(dt = '2021-01-05') |
There was a problem hiding this comment.
this will create a new parquet file with the same prefix against log file, but something diff in fgId suffix. just like the picture, create new parquet file will add -0 after fgId(xxx-0-0_xxx), so it can be read if only insert overwrite onece, but if insert overwrite again, will use the same fgId(xxx-0-0), result nothing.

5fa9c60 to
d4e9858
Compare
d4e9858 to
d0099b9
Compare
...di-client-common/src/main/java/org/apache/hudi/exception/HoodieInsertOverwriteException.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/hudi/table/action/commit/SparkInsertOverwriteCommitActionExecutor.java
Outdated
Show resolved
Hide resolved
| case CONSISTENT_HASHING: | ||
| return new SparkInsertOverwriteConsistentBucketIndexPartitioner(profile, context, table, config); | ||
| default: | ||
| throw new HoodieNotSupportedException("Unknown bucket index engine type: " + config.getBucketIndexEngineType()); |
There was a problem hiding this comment.
Can we inline all the different handling for getBucketInfo into SparkInsertOverwritePartitioner ? Let's make the code cleaner.
There was a problem hiding this comment.
I move part of them which related to ConsistentBucketIndex to SparkInsertOverwritePartitioner.
And I left other part which related to SimpleBucketIndex in SparkBucketIndexInsertOverwritePartitioner.
Because SimpleBucketIndex and ConsistentBucketIndex are different when creates new BucketInfo.
| return handleInsert(binfo.fileIdPrefix, recordItr); | ||
| } else if (btype.equals(BucketType.UPDATE)) { | ||
| throw new HoodieInsertOverwriteException( | ||
| "Insert overwrite should always use INSERT bucketType, please correct the logical of " + partitioner.getClass().getName()); |
There was a problem hiding this comment.
In which case we can hit the code path for BucketType.UPDATE ?
There was a problem hiding this comment.
This is a protected code to prevent hit this bug again when introduce new partitioner class in the future.
d0099b9 to
a76dc55
Compare
...lient/src/main/java/org/apache/hudi/table/action/commit/SparkInsertOverwritePartitioner.java
Show resolved
Hide resolved
|
|
||
| @Override | ||
| public BucketInfo getBucketInfo(int bucketNumber) { | ||
| String partitionPath = partitionPaths.get(bucketNumber / numBuckets); |
There was a problem hiding this comment.
In HoodieWriteConfig, we can fetch the operation then decides whether it is INSERT_OVERWRITE, then the logic can be moved into SparkBucketIndexPartitioner.
2015eb0 to
7267340
Compare
danny0405
left a comment
There was a problem hiding this comment.
+1, we are good to land once the CI is green
…rt overwrite behavior
7267340 to
f550344
Compare
|
@hudi-bot run azure |
Change Logs
Snapshot query result is wrong after apply insert overwrite to an existed table with simple bucket index.
see detailed in HUDI-5857.
The root cause of the bug is the write handler reuses existed bucket file id for insert overwrite. Besides it use replace commit for insert overwrite operation and mark all the existed bucket file id as replaced.
So the snapshot query result is wrong.
The pr aims to fix this bug by generating new file id for bucket if insert overwrite into bucket index table.
Impact
NA
Risk level (write none, low medium or high below)
NA
Documentation Update
NA
Contributor's checklist